Skip to content

一、概述

zookeeper是一个开源的分布式协调服务,提供分布式数据一致性解决方案,分布式应用程序可以实现数据发布订阅、负载均衡、命名服务、集群管理分布式锁、分布式队列等功能。 是一个类似hdfs的树形文件结构,zookeeper可以用来保证数据在(zk)集群之间的数据的事务性一致、 有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher zookeeper有三个角色:Learner,Follower,Observer zookeeper应用场景:

  1. 统一命名服务(Name Service)
  2. 配置管理(Configuration Management)
  3. 集群管理(Group Membership)
  4. 共享锁(Locks)
  5. 队列管理

1.1 结构

zooKeeper使用树形结构管理数据。而且以“/”作为树形结构的根节点。树形结构中的每一个节点都称为“znode”,每个节点上都会保存自己的数据和节点信息。。文件系统中的目录可以存放其他目录和文件,znode中可以存放其他znode,也可以对应一个具体的值(少量(1MB)数据)。znode和它对应的值之间是键值对的关系。

节点的数据:即znode data(节点path, 节点data)的关系就像是java map中(key, value)的关系

节点的子节点children

节点的状态stat:用来描述当前节点的创建、修改记录,包括cZxid、ctime等

1.2 通知机制

使用ZooKeeper的通知机制,项目工程在特定znode上设置Watcher(观察者)来监控当前节点上值的变化。一旦Watcher检测到了数据变化就会立即通知监听者模块,从而自动实现“一处修改,处处生效”的效果。属于异步通知 image-20200825213948419

ZooKeeper的观察机制是一种异步回调的触发机制。

ZooKeeper支持Watch(观察)机制,客户端可以在znode节点上设置一个Watcher(观察者)。如果被观察的znode结点有变更(数据改变、被删除、子目录节点增加删除)时,Watcher会被触发,zookeeper会通知所属的客户端将接收到节点发生变化的通知,这就是把相应的事件通知给设置过Watcher的Client端。

ZooKeeper里的所有读取操作:getData(),getChildren()和exists()都有设置Watch的选项。

zkServer向客户端发送一个Watch通知,Watch触发一次就失效了。如果想继续Watch的话,需要客户端重新设置Watcher。如果在接收通知后继续得到节点变化通知,就必须另外设置一个新的Watcher继续观察。

节点有不同的改动方式。ZooKeeper维护两个观察列表:数据观察和子节点观察。getData()和exists()设置数据观察。getChildren()设置子节点观察。不同的返回数据有不同的观察。getData()和exists()返回节点的数据,而getChildren()返回子节点列表。

setData()将为znode触发数据观察。create()为创建的节点触发数据观察,为其父节点触发子节点观察。delete()将会为被删除的节点触发数据观察以及子节点观察(因为节点不能再有子节点了),为其父节点触发子节点观察。如果一个节点设置存在观察时尚未创建,并且在断开连接后执行节点创建以及删除操作,那么在节点上设置的观察事件客户端接收不到,事件会丢失。

1.3 节点类型

临时节点:生命周期依赖于创建它们的会话。一旦会话(Session)结束,临 时节点将被自动删除,当然可以也可以手动删除。,ZooKeeper的临时节点不允许拥有子节点。

持久化节点:生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除

PERSISTENTper持久化目录节点。客户端与zookeeper断开连接后,该节点依旧存在
PERSISTENT_SEQUENTIAL持久化顺序编号目录节点。客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号 -s
EPHEMERAL临时目录节点。客户端与zookeeper断开连接后,该节点被删除 -e
EPHEMERAL_SEQUENTIAL临时顺序编号目录节点。客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号 -es

1.4 节点状态

znode维护了一个stat结构,这个stat包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让ZooKeeper验证缓存和协调更新。每次znode的数据发生了变化,版本号就增加。

例如:无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的znode的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败

属性

czxid引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id)
ctimeznode被创建的毫秒数(从1970年开始)
mzxidznode最后更新的zxid
mtimeznode最后修改的毫秒数(从1970年开始)
pZxidznode最后更新的子节点zxid
cversionznode子节点变化号,znode子节点修改次数
dataversionznode数据变化号
aclVersionznode访问控制列表的变化号
ephemeralOwner如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
dataLengthznode的数据长度
numChildrenznode子节点数量

1.5 一致性协议 zab协议

zab协议 的全称是 Zookeeper Atomic Broadcast (zookeeper原子广播)。

zookeeper 是通过 zab协议来保证分布式事务的最终一致性 基于zab协议,zookeeper集群中的角色主要有三类

zab广播模式工作原理,通过类似两阶段提交协议的方式解决数据一致性:

  1. leader从客户端收到一个写请求
  2. leader生成一个新的事务并为这个事务生成一个唯一的ZXID
  3. leader将这个事务提议(propose)发送给所有的follows节点
  4. follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack给 leader
  5. 当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请求
  6. 当follower收到commit请求时,从历史队列中将事务请求commit

1.6 zookeeperleader选举

服务器状态

looking:寻找leader状态。当服务器处于该状态时,它会认为当前集群中没有 leader,因此需要进入leader选举状态。 leading: 领导者状态。表明当前服务器角色是leader。 following: 跟随者状态。表明当前服务器角色是follower。 observing:观察者状态。表明当前服务器角色是observer。

服务器启动时期leader选举

在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成 leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都 试图找到leader,于是进入leader选举过程。选举过程如下:

  1. 每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为 leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自 将这个投票发给集群中其他机器。
  2. 集群中的每台服务器接收来自集群中各个服务器的投票。
  3. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行pk,pk规则如下优先检查zxid。zxid比较大的服务器优先作为leader。 如果zxid相同,那么就比较myid。myid较大的服务器作为leader服务器。 对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较 两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票 为(2, 0),然后重新投票,对于server2而言,其无须更新自己的投票,只是再次向集 群中所有机器发出上一次投票信息即可。
  4. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到 相同的投票信息,对于server1、server2而言,都统计出集群中已经有两台机器接受 了(2, 0)的投票信息,此时便认为已经选出了leader
  5. 改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是 follower,那么就变更为following,如果是leader,就变更为leading。

服务器运行时期leader选举

在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader服务器宕机或新加入,此时也不会影响leader,但是一旦leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本一致。 假设正在运行的有server1、server2、server3三台服务器,当前leader是 server2,若某一时刻leader挂了,此时便开始Leader选举。选举过程如下:

  1. 变更状态。leader挂后,余下的服务器都会将自己的服务器状态变更为looking,然 后开始进入leader选举过程。
  2. 每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定 server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3 都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器。
  3. 接收来自各个服务器的投票。与启动时过程相同
  4. 处理投票。与启动时过程相同,此时,server3将会成为leader。
  5. 统计投票。与启动时过程相同。
  6. 改变服务器的状态。与启动时过程相同

observer角色及其配置

角色特点: 不参与集群的leader选举 、不参与集群中写数据时的ack反馈 为了使用observer角色,在任何想变成observer角色的配置文件中加入如下配置: peerType=observer 并在所有server的配置文件中,配置成observer模式的server的那行配置追加:observer, 例如 server.3=192.168.60.130:2289:3389:observer

1.7 应用场景

  • 配置中心 在平常的业务开发过程中,我们通常需要将系统的一些通用的全局配置,例如机器列表配置,运行时开关配置,数据库配置信息等统一集中存储,让集群所有机器共享配置信息,系统在启动会首先从配置中心读取配置信息,进行初始化。传统的实现方式将配置存储在本地文件和内存中,一旦机器规模更大,配置变更频繁情况下,本地文件和内存方式的配置维护成本较高,使用zookeeper作为分布式的配置中心就可以解决这个问题。 我们将配置信息存在zk中的一个节点中,同时给该节点注册一个数据节点变更的watcher监听,一旦节点数据发生变更,所有的订阅该节点的客户端都可以获取数据变更通知。

  • 负载均衡 建立server节点,并建立监听器监视servers子节点的状态(用于在服务器增添时及时同步当前集群中服务器列表)。在每个服务器启动时,在servers节点下建立具体服务器地址的子节点,并在对应的字节点下存入服务器的相关信息。这样,我们在zookeeper服务器上可以获取当前集群中的服务器列表及相关信息,可以自定义一个负载均衡算法,在每个请求过来时从zookeeper服务器中获取当前集群服务器列表,根据算法选出其中一个服务器来处理请求。

  • 命名服务 命名服务是分布式系统中的基本功能之一。被命名的实体通常可以是集群中的机器、提供的服务地址或者远程对象,这些都可以称作为名字。常见的就是一些分布式服务框架(RPC、RMI)中的服务地址列表,通过使用名称服务客户端可以获取资源的实体、服务地址和提供者信息。命名服务就是通过一个资源引用的方式来实现对资源的定位和使用。在分布式环境中,上层应用仅仅需要一个全局唯一名称,就像数据库中的主键。 在单库单表系统中可以通过自增ID来标识每一条记录,但是随着规模变大分库分表很常见,那么自增ID有仅能针对单一表生成ID,所以在这种情况下无法依靠这个来标识唯一ID。UUID就是一种全局唯一标识符。但是长度过长不易识别。

    在Zookeeper中通过创建顺序节点就可以实现,所有客户端都会根据自己的任务类型来创建一个顺序节点,例如 job-00000001 节点创建完毕后,create()接口会返回一个完整的节点名,例如:job-00000002 拼接type类型和完整节点名作为全局唯一的ID

  • DNS服务 域名配置 在分布式系统应用中,每一个应用都需要分配一个域名,日常开发中,往往使用本地HOST绑定域名解析,开发阶段可以随时修改域名和IP的映射,大大提高开发的调试效率。如果应用的机器规模达到一定程度后,需要频繁更新域名时,需要在规模的集群中变更,无法保证实时性。所有我们在zk上创建一个节点来进行域名配置

    域名解析 应用解析时,首先从zk域名节点中获取域名映射的IP和端口。 域名变更 每个应用都会在在对应的域名节点注册一个数据变更的watcher监听,一旦监听的域名节点数据变更,zk会向所有订阅的客户端发送域名变更通知。

  • 集群管理 随着分布式系统规模日益扩大,集群中机器的数量越来越多。有效的集群管理越来越重要了,zookeeper集群管理主要利用了watcher机制和创建临时节点来实现。以机器上下线和机器监控为例:

    机器上下线 新增机器的时候,将Agent部署到新增的机器上,当Agent部署启动时,会向zookeeper指定的节点下创建一个临时子节点,当Agent在zk上创建完这个临时节点后,当关注的节点zookeeper/machines下的子节点新加入新的节点时或删除都会发送通知,这样就对机器的上下线进行监控。

    机器监控 在机器运行过程中,Agent会定时将主机的的运行状态信息写入到/machines/hostn主机节点,监控中心通过订阅这些节点的数据变化来获取主机的运行信息。

  • 分布式锁 数据库实现分布式锁

    首先我们创建一张锁表,锁表中字段设置唯一约束。 定义锁,实现Lock接口,tryLock()尝试获取锁,从锁表中查询指定的锁记 录,如果查询到记录,说明已经上锁,不能再上锁

    在lock方法获取锁之前先调用tryLock()方法尝试获取锁,如果未加锁则向锁表中插入一条锁记录来获取锁,这里我们通过循环,如果上锁我们一致等待锁的释放

    释放锁,即是将数据库中对应的锁表记录删除 注意在尝试获取锁的方法tryLock中,存在多个线程同时获取锁的情况,可以简单通过synchronized解 决

    redis实现分布式锁 redis分布式锁的实现基于setnx(set if not exists),设置成功,返回1;设置失败,返回0,释放锁的操作通过del指令来完成 如果设置锁后在执行中间过程时,程序抛出异常,导致del指令没有调用,锁永远无法释放,这样就会陷入死锁。所以我们拿到锁之后会给锁加上一个过期时间,这样即使中间出现异常,过期时间到后会自动释放锁。 同时在setnx 和 expire 如果进程挂掉,expire不能执行也会死锁。所以要保证setnx和expire是一个原子性操作即可。redis 2.8之后推出了setnx和expire的组合指令

    redis实现分布式锁注意的事项: redis如何避免死锁 lock获取锁方法

    释放锁 redis实现分布式锁存在的问题,为了解决redis单点问题,我们会部署redis集群,在 Sentinel 集群中,主节点突然挂掉了。同时主节点中有把锁还没有来得及同步到从节点。这样就会导致系统中同样一把锁被两个客户端同时持有,不安全性由此产生。redis官方为了解决这个问题,推出了Redlock 算法解决这个问题。但是带来的网络消耗较大。

    zookeeper实现分布式锁 原理:zookeeper通过创建临时序列节点来实现分布式锁,适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过watch来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理,依次类推……

    首先在ZkLock的构造方法中,连接zk,创建lock根节点 添加watch监听临时顺序节点的删除 获取锁操作 释放锁

  • 分布式队列

    队列特性:FIFO(先入先出),zookeeper实现分布式队列的步骤: 在队列节点下创建临时顺序节点 例如/queue_info/192.168.1.1-0000001 调用getChildren()接口来获取/queue_info节点下所有子节点,获取队列中所有元素 比较自己节点是否是序号最小的节点,如果不是,则等待其他节点出队列,在序号最小的节点注册watcher 获取watcher通知后,重复步骤

二、命令

2.1 常用命令

zk命令

sh
# 服务端命令


<NolebasePageProperties />




#启动服务器:
./zkServer.sh start
#停止服务器:
./zkServer.sh stop
# 查看服务状态
./zkServer.sh status
# 重启服务
./zkServer.sh restart 

# 客户端命令
#启动客户端:
./zkCli.sh  ./zkCli.sh -server locahost:port
#退出客户端:
[zk: localhost:2181(CONNECTED) 6] quit

节点操作

sh
#查看当前znode中所包含的内容   
ls
#查看当前节点数据并能看到更新次数等数据  
ls2
#查看节点状态   
stat
#普通创建:不带有-s、-e参数-s:含有序列-e:临时(重启或者超时消失)  
create [-s] [-e] path data acl
#设置节点的具体值 更新 也可以基于版本号进行更改,此时类似于乐观锁机制,当你传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 会拒绝本次修改:
set 节点 value值 [版本号]
#获得节点的值   
get 节点
#可以删除指定znode,当该znode拥有子znode时,必须先删除其所有子znode,否则操作将失败。  
delete path [version]
# 删除带子节点的节点
deleteall /节点path
#rmr命令可用于代替delete命令,rmr是一个递归删除命令,如果发生指定节点拥有子节点时,rmr命令会首先删除子节点。  
rmr

监听

sh
# 注册的监听器能够在节点内容发生改变的时候,向客户端发出通知。触发一次后就会立即失效。
get path [watch]
# 在节点状态发生改变的时候,向客 户端发出通知
stat path [watch]
# 注册的监听器能够监听该节点下 所有子节点的增加和删除操作
ls path [watch] 或 ls2 path [watch]

2.2 四字监控命令

sh
#ZooKeeper支持某些特定的四字命令,他们大多是用来查询ZooKeeper服务的当前状态及相关信息的,使用时通过telnet或nc向ZooKeeper提交相应命令。

#nc命令需要安装对应的程序才可以使用。yum install -y nc
nc
# 使用
echo mntr | nc localhost 2181

#测试服务是否处于正确状态。如果确实如此,那么服务返回“imok ”,否则不做任何响应
ruok
# 输出关于性能和连接的客户端的列表 输出服务器的详细信息:接收/发送包数量、连接数、模式 (leader/follower)、节点总数、延迟。 所有客户端的列表
stat
# 重置server状态
srst
# 输出相关服务配置的详细信息
conf
# 列出所有连接到服务器的客户端的完全的连接 /会话的详细信息。包括“接受 / 发送”的包数量、会话id 、操作延迟、最后的操作执行等等信息
cons
# 重置当前这台服务器所有连接/会话的统计信息
crst
# 列出未经处理的会话和临时节点
dump
# 输出关于服务环境的详细信息(区别于conf命令)
envi
# 列出未经处理的请求
reqs
# 列出服务器watches的简洁信息:连接总数、watching节点总数和 watches总数
wchs
# 通过session列出服务器watch的详细信息,它的输出是一个与watch相关的会话的列表
wchc
# 通过路径列出服务器 watch的详细信息。它输出一个与 session相关的路径
wchp
# 列出集群的健康状态。包括“接受/发送”的包数量、操作延迟、当前服务模 式(leader/follower)、节点总数、watch总数、临时节点总数
mntr

conf命令

输出相关服务配置的详细信息 shell终端输入:echo conf| nc localhost 2181

属性含义
clientPort客户端端口号
dataDir数据快照文件目录 默认情况下100000次事务操作生成一次快照
dataLogDir事物日志文件目录,生产环境中放在独立的磁盘上
tickTime服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位)
maxClientCnxns最大连接数
minSessionTimeout最小session超时 minSessionTimeout=tickTime*2
maxSessionTimeout最大session超时 maxSessionTimeout=tickTime*20
serverId服务器编号
initLimit集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数
syncLimit集群中的follower服务器(F)与leader服务器(L)之间 请求和应答之间能容忍的最多心跳数
electionAlg0:基于UDP的LeaderElection 1:基于UDP的FastLeaderElection 2:基于UDP和认证的<br />FastLeaderElection 3:基于TCP的FastLeaderElection 在3.4.10版本中,<br />默认值为3另外三种算法已经被弃用,并且有计划在之后的版本中将它们彻底删除而不再支持
electionPort选举端口
quorumPort数据通信端口
peerType是否为观察者 1为观察者

cons命令

列出所有连接到这台服务器的客户端连接/会话的详细信息 shell终端输入:echo cons| nc localhost 2181

属性含义
ipip地址
port端口号
queued等待被处理的请求数,请求缓存在队列中
received收到的包数
sent发送的包数
sid会话id
lop最后的操作 GETD-读取数据 DELE-删除数据 CREA-创建数据
est连接时间戳
to超时时间
lcxid当前会话的操作id
lzxid最大事务id
lresp最后响应时间戳
llat最后/最新 延时
minlat最小延时
maxlat最大延时
avglat平均延时

crst命令

重置当前这台服务器所有连接/会话的统计信息 shell终端输入:echo crst| nc localhost 2181

dump命令

列出未经处理的会话和临时节点 shell终端输入:echo dump| nc localhost 2181 session id :znode path(1对多 , 处于队列中排队的session和临时节点)

envi命令

输出关于服务器的环境配置信息 shell终端输入:echo envi| nc localhost 2181

属性含义
zookeeper.version版本
host.namehost信息
java.versionjava版本
java.vendor供应商
java.home运行环境所在目录
java.class.pathclasspath
java.library.path第三方库指定非java类包的位置(如:dll,so)
java.io.tmpdir默认的临时文件路径
java.compilerJIT 编译器的名称
os.nameLinux
os.archamd64
os.version3.10.0-514.el7.x86_64
user.namezookeeper
user.home/home/zookeeper
user.dir/home/zookeeper/zookeeper2181/bin

rouk命令

测试服务是否处于正确运行状态 shell终端输入:echo ruok| nc localhost 2181

stat命令

输出服务器的详细信息与srvr相似,但是多了每个连接的会话信息 shell终端输入:echo stat| nc localhost 2181

属性含义
Zookeeper version版本
Latency min/avg/max延时
Received收包
Sent发包
Connections连接数
Outstanding堆积数
Zxid最大事物id
Mode服务器角色
Node count节点数

srst命令

重置server状态 shell终端输入:echo srst| nc localhost 2181

wchs命令

列出服务器watches的简洁信息 shell终端输入:echo wchs| nc localhost 2181

属性含义
connectsions连接数
watch-pathswatch节点数
watcherswatcher数量

wchc命令

通过session分组,列出watch的所有节点,它的输出的是一个与 watch 相关的会话的节点列表 shell终端输入:echo wchc| nc localhost 2181

sh
# wchc is not executed because it is not in the whitelist.
# 修改启动指令 zkServer.sh
# 注意找到这个信息 
else 
echo "JMX disabled by user request" >&2 
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" 
fi
# 下面添加如下信息 
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

wchp命令

通过路径分组,列出所有的 watch 的session id信息 shell终端输入:echo wchp| nc localhost 2181

sh
# wchp is not executed because it is not in the whitelist.
# 修改启动指令 zkServer.sh 
# 注意找到这个信息 
else
echo "JMX disabled by user request" >&2 
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" 
fi
# 下面添加如下信息 
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

mntr命令

列出服务器的健康状态. shell终端输入:echo mntr| nc localhost 2181

属性含义
zk_version版本
zk_avg_latency平均延时
zk_max_latency最大延时
zk_min_latency最小延时
zk_packets_received收包数
zk_packets_sent发包数
zk_num_alive_connections连接数
zk_outstanding_requests堆积请求数
zk_server_stateleader/follower 状态
zk_znode_countznode数量
zk_watch_countwatch数量
zk_ephemerals_count临时节点(znode)
zk_approximate_data_size数据大小
zk_open_file_descriptor_count打开的文件描述符数量
zk_max_file_descriptor_count最大文件描述符数量

2.3 acl权限控制

acl 权限控制,使用scheme:id:permission 来标识,主要涵盖 3 个方面:

  • 权限模式(scheme):授权的策略
  • 授权对象(id):授权的对象
  • 权限(permission):授予的权限

特性:

  • zooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  • 每个znode支持设置多种权限控制方案和多个权限
  • 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

授权对象ID是指,权限赋予的实体,例如:IP 地址或用户

权限模式

方案描述
world只有一个用户:anyone,戴白哦登录zookeeper的所有人
ip对客户端使用的ip地址认证
auth使用已添加认证的用户认证
digest使用“用户名:密码”方式认证

授予的权限

create、delete、read、writer、admin也就是 增、删、改、查、管理权限, 这5种权限简写为cdrwa,注意:这5种权限中,delete是指对子节点的删除权限,其它4种 权限指对自身节点的操作权限

权限ACL简写描述
createc可以创建子节点
deleted可以删除子节点(仅下一级节点)
readr可以读取节点数据及显示子节点列表
writew可以设置节点数据
admina可以设置节点访问控制列表权限

授权相关命令

命令使用方式描述
getAclgetAcl读取ACL权限
setAclsetAcl设置ACL权限
addauthaddauth添加认证用户

案例

sh
# world授权模式
setAcl <path> world:anyone:<acl>
setAcl /node1 world:anyone:cdrwa
# IP授权模式:
setAcl <path> ip:<ip>:<acl>
setAcl /node2 ip:192.168.60.129:cdrwa
# Auth授权模式:
addauth digest <user>:<password> #添加认证用户 
setAcl <path> auth:<user>:<acl>
addauth digest itcast:123456
setAcl /node3 auth:itcast:cdrwa
# Digest授权模式:
setAcl <path> digest:<user>:<password>:<acl>
# 密码是经过SHA1及BASE64处理的密文,在SHELL中可以通过以下命令计算:
echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
echo -n itheima:123456 | openssl dgst -binary -sha1 | openssl base64
setAcl /node4 digest:itheima:qlzQzCLKhBROghkooLvb+Mlwv4A=:cdrwa

# 多种模式授权
setAcl /node5 ip:192.168.60.129:cdra,auth:itcast:cdrwa,digest:itheima:qlzQzCLKhBROgh kooLvb+Mlwv4A=:cdrwa

ACL超级管理员

zookeeper的权限管理模式有一种叫做super,该模式提供一个超管可以方便的访问任何权限的节点

sh
# 生成密钥
echo -n super:admin | openssl dgst -binary -sha1 | openssl base64
# 修改/bin/zkServer.sh脚本文件
nohup $JAVA "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "- Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
# 添加
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBv st5y6rkB6HQs="

addauth digest super:admin #添加认证用户

三、Java代码操作ZK

3.1 ZKClient连接zk

依赖

xml
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
</dependency>

主要方法

java
//创建zookeeper对象
//连接zookeeper服务器
public ZooKeeper(
String connectString, // zooKeeper集合主机。多个以,号分隔
int sessionTimeout, 
Watcher watcher) throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}

//断开ZooKeeper服务器连接
org.apache.zookeeper.ZooKeeper.close()
//   创建节点 
org.apache.zookeeper.ZooKeeper.create(
String, // znode路径
byte[], //存储在节点的数据
List<ACL>, //可以使用Ids.OPEN_ACL_UNSAFE 访问控制列表,可通过ZooDefs.Ids 来获取一些基本的acl列表。
CreateMode//CreateMode本身是一个枚举类型
    // 添加后标识为异步方式 方法都可添加
AsyncCallback.StatCallback callBack, Object ctx )//callBack-异步回调接口 ctx-传递上下文参数
// 获取节点上的数据
   org.apache.zookeeper.ZooKeeper.getData(
String, 
boolean, 
Stat)

// 查看子节点
// 同步方式 
getChildren(String path, boolean b) // b是否使用连接对象中注册的监视
// 异步方式 
getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)

//设置节点上的数据
    org.apache.zookeeper.ZooKeeper.setData(
String, 
byte[], 
int
AsyncCallback.StatCallback callBack, Object ctx
)//使用-1作为版本号的值时可以匹配所有版本号
//判断节点是否存在
 org.apache.zookeeper.ZooKeeper.exists(String, boolean)   // 是否使用连接对象中注册的监视器。

// 删除节点
// 同步方式 
delete(String path, int version) 
// 异步方式 
delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx)

代码实现

java
// 1.连接Zookeeper服务器的信息
String connectString = "192.168.56.150:2181";
int sessionTimeout = 5000;
Watcher watcher = new Watcher() {@Override public void process(WatchedEvent event) {}};

// 2.创建ZooKeeper建立连接
ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);

// 3.通过设置节点值测试连接
byte[] data = "BBB".getBytes();

// -1表示不验证版本号
int version = -1;

Stat stat = zooKeeper.setData("/fruit", data, version);

System.out.println(stat.getVersion());

// 4.关闭连接
zooKeeper.close();

通知测试代码

java
// 一次性通知
// 1.准备连接Zookeeper服务器的连接信息
String connectingString = "192.168.56.100:2181";
int sessionTimeout = 5000;

// 2.创建Zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper(connectingString, sessionTimeout, new Watcher() {
	public void process(WatchedEvent event) {}
});

// 3.获取指定节点的值
String path = "/fruit";
byte[] data = zooKeeper.getData(path, new Watcher() {
	
	@Override
	public void process(WatchedEvent event) {
		System.out.println("/fruit节点的值被修改了!"+Thread.currentThread().getName());
	}
}, new Stat());

// 4.将获取节点值时得到的字节数组转化成String类型
String dataString = new String(data);

// 5.打印结果
System.out.println(dataString);

// ※让程序不能停
while(true) {
	
	Thread.sleep(5000);
	
	System.err.println("药不能停,不能停……[代表程序原本要执行的业务逻辑功能……]"+Thread.currentThread().getName());
	
}

// 6.关闭连接
// zooKeeper.close();

持续性通知

java
// 持续性通知
// 专门封装一个方法用来实现持续的异步监听
public void getNodeData(String path, ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
	
	// 调用获取节点值的方法时设置Watcher进行监控
	byte[] data = zooKeeper.getData(path, new Watcher() {
		
		@Override
		public void process(WatchedEvent event) {
			
			// 接收到异步通知时,再次调用获取节点值方法,目的是设置新的Watcher
			try {
				getNodeData(path, zooKeeper);
			} catch (KeeperException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
	}, new Stat());
	
	String result = new String(data);
	
	System.err.println("当前节点值="+result);
	
}

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
	
	// 1.建立连接
	String connectingString = "192.168.56.100:2181";
	int sessionTimeout = 5000;
	
	ZooKeeper zooKeeper = new ZooKeeper(connectingString, sessionTimeout, new Watcher() {
		public void process(WatchedEvent event) {}
	});
	
	// 2.获取节点数据
	String path = "/fruit";
	new AsyncNoticeForever().getNodeData(path, zooKeeper);
	
	// 3.让方法持续运行
	while(true) {
		
		Thread.sleep(5000);
		
		System.err.println("药不能停,不能停……[代表程序原本要执行的业务逻辑功能……]"+Thread.currentThread().getName());
		
	}
}

3.2 Curator连接ZK

Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。其解决session会话超时重连 、watcher反复注册 、简化开发api 、遵循Fluent风格的API 、提供了分布式锁服务、共享计数器、缓存机制等机制 官网:http://curator.apache.org/

依赖

xml
 <!--curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

创建连接

java

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;

public class CuratorConnection {
    public static void main(String[] args) {
        // session重连策略
        /*
            3秒后重连一次,只重连1次
            RetryPolicy retryPolicy = new RetryOneTime(3000);
        */
        /*
            每3秒重连一次,重连3次
            RetryPolicy retryPolicy = new RetryNTimes(3,3000);
        */
       /*
            每3秒重连一次,总等待时间超过10秒后停止重连
            RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
       */
        // baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // 创建连接对象
        CuratorFramework client= CuratorFrameworkFactory.builder()
                // IP地址端口号
                .connectString("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183")
                // 会话超时时间
                .sessionTimeoutMs(5000)
                // 重连机制
                .retryPolicy(retryPolicy)
                // 命名空间
                .namespace("create")
                // 构建连接对象
                .build();
        // 打开连接
        client.start();
        System.out.println(client.isStarted());
        // 关闭连接
        client.close();
    }
}

节点新增

java
// 新增节点
client.create()
	  // 递归节点的创建 多节点时使用 如:"/node3/node31
		.creatingParentsIfNeeded()
		// 节点的类型
		.withMode(CreateMode.PERSISTENT)
		// 节点的权限列表 world:anyone:cdrwa
		.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
	 // 异步回调接口 不使用为同步方式
		.inBackground(new BackgroundCallback() {
			 public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
				 // 节点的路径
				 System.out.println(curatorEvent.getPath());
				 // 时间类型
				 System.out.println(curatorEvent.getType());
			 }
})
		// arg1:节点的路径
		// arg2:节点的数据
		.forPath("/node1", "node1".getBytes());
System.out.println("结束");

更新节点

java
  // 更新节点
client.setData()
	// 指定版本号
		.withVersion(2)
	// 异步
	.inBackground(new BackgroundCallback() {
	public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
		// 节点的路径
		System.out.println(curatorEvent.getPath());
		// 事件的类型
		System.out.println(curatorEvent.getType());
	}
})
		// arg1:节点的路径
		// arg2:节点的数据
		.forPath("/node1", "node11".getBytes());
System.out.println("结束");

删除节点

java
// 删除节点
client.delete()
	// 删除包含子节点的节点
	 .deletingChildrenIfNeeded()
	// 版本号
		.withVersion(0)
	// 异步
	 .inBackground(new BackgroundCallback() {
			public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
				// 节点路径
				System.out.println(curatorEvent.getPath());
				// 事件类型
				System.out.println(curatorEvent.getType());
			}
		})
		// 节点的路径
		.forPath("/node1");
System.out.println("结束");

查看节点

java
// 读取节点数据
byte [] bys=client.getData()
	 // 读取属性
		.storingStatIn(stat)
		 // 节点的路径
		.forPath("/node1");
System.out.println(new String(bys));

// 异步方式读取节点的数据
client.getData()
		 .inBackground(new BackgroundCallback() {
			 public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
				 // 节点的路径
				 System.out.println(curatorEvent.getPath());
				 // 事件类型
				 System.out.println(curatorEvent.getType());
				 // 数据
				 System.out.println(new String(curatorEvent.getData()));
			 }
		 })
		.forPath("/node1");
Thread.sleep(5000);
System.out.println("结束");

查看子节点

java
 // 读取子节点数据
List<String> list = client.getChildren()
		// 节点路径
		.forPath("/get");
for (String str : list) {
	System.out.println(str);
}

// 异步方式读取子节点数据
client.getChildren()
		.inBackground(new BackgroundCallback() {
			public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
				// 节点路径
				System.out.println(curatorEvent.getPath());
				// 事件类型
				System.out.println(curatorEvent.getType());
				// 读取子节点数据
				List<String> list=curatorEvent.getChildren();
				for (String str : list) {
					System.out.println(str);
				}
			}
		})
		.forPath("/get");
Thread.sleep(5000);
System.out.println("结束");

检查节点是否存在

java
// 判断节点是否存在
Stat stat= client.checkExists()
		 // 节点路径
		.forPath("/node2");
System.out.println(stat.getVersion());

// 异步方式判断节点是否存在
client.checkExists()
		 .inBackground(new BackgroundCallback() {
			 public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
				 // 节点路径
				 System.out.println(curatorEvent.getPath());
				 // 事件类型
				 System.out.println(curatorEvent.getType());
				 System.out.println(curatorEvent.getStat().getVersion());
			 }
		 })
		.forPath("/node2");
Thread.sleep(5000);
System.out.println("结束");

watcherApi curator提供了两种Watcher(Cache)来监听结点的变化 Node Cache : 只是监听某一个特定的节点,监听节点的新增和修改 PathChildren Cache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除 时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态

java
 // 监视某个节点的数据变化
// arg1:连接对象
// arg2:监视的节点路径
final NodeCache nodeCache=new NodeCache(client,"/watcher1");
// 启动监视器对象
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
	// 节点变化时回调的方法
	public void nodeChanged() throws Exception {
		System.out.println(nodeCache.getCurrentData().getPath());
		System.out.println(new String(nodeCache.getCurrentData().getData()));
	}
});
Thread.sleep(100000);
System.out.println("结束");
//关闭监视器对象
nodeCache.close();

// 监视子节点的变化
// arg1:连接对象
// arg2:监视的节点路径
// arg3:事件中是否可以获取节点的数据
PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/watcher1",true);
// 启动监听
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
	// 当子节点方法变化时回调的方法
	public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
		// 节点的事件类型
		System.out.println(pathChildrenCacheEvent.getType());
		// 节点的路径
		System.out.println(pathChildrenCacheEvent.getData().getPath());
		// 节点数据
		System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
	}
});
Thread.sleep(100000);
System.out.println("结束");
// 关闭监听
pathChildrenCache.close();

事务

java
 // 开启事务
client.inTransaction()
		.create().forPath("/node1","node1".getBytes())
		.and()
		.create().forPath("/node2","node2".getBytes())
		.and()
		//事务提交
		.commit();

分布式锁 InterProcessMutex:分布式可重入排它锁 InterProcessReadWriteLock:分布式读写锁

java
  // 排他锁
// arg1:连接对象
// arg2:节点路径
InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
	Thread.sleep(3000);
	System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");


// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
// 获取读锁对象
InterProcessLock interProcessLock=interProcessReadWriteLock.readLock();
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
	Thread.sleep(3000);
	System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");

// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
// 获取写锁对象
InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock();
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
	Thread.sleep(3000);
	System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");

3.3 使用案例

配置中心示例

读取zookeeper中的配置信息,注册watcher监听器,存入本地变量

当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件

重新获取配置信息

java
import java.util.concurrent.CountDownLatch;

import com.itcast.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

public class MyConfigCenter implements Watcher {

    //  zk的连接串
    String IP = "192.168.60.130:2181";
    //  计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    // 连接对象
    static ZooKeeper zooKeeper;

    // 用于本地化存储配置信息
    private String url;
    private String username;
    private String password;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 捕获事件状态
            if (event.getType() == Event.EventType.None) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接成功");
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("连接断开!");
                } else if (event.getState() == Event.KeeperState.Expired) {
                    System.out.println("连接超时!");
                    // 超时后服务器端已经将连接释放,需要重新连接服务器端
                    zooKeeper = new ZooKeeper("192.168.60.130:2181", 6000,
                            new ZKConnectionWatcher());
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("验证失败!");
                }
                // 当配置信息发生变化时
            } else if (event.getType() == EventType.NodeDataChanged) {
                initValue();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    // 构造方法
    public MyConfigCenter() {
        initValue();
    }


    // 连接zookeeper服务器,读取配置信息
    public void initValue() {
        try {
            // 创建连接对象
            zooKeeper = new ZooKeeper(IP, 5000, this);
            // 阻塞线程,等待连接的创建成功
            countDownLatch.await();
            // 读取配置信息
            this.url = new String(zooKeeper.getData("/config/url", true, null));
            this.username = new String(zooKeeper.getData("/config/username", true, null));
            this.password = new String(zooKeeper.getData("/config/password", true, null));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }


    public static void main(String[] args) {
        try {
            MyConfigCenter myConfigCenter = new MyConfigCenter();
            for (int i = 1; i <= 20; i++) {
                Thread.sleep(5000);
                System.out.println("url:"+myConfigCenter.getUrl());
                System.out.println("username:"+myConfigCenter.getUsername());
                System.out.println("password:"+myConfigCenter.getPassword());
                System.out.println("########################################");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }


    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }


}
分布式唯一id

指定路径生成临时有序节点 取序列号及为分布式环境下的唯一ID

java
import java.util.concurrent.CountDownLatch;

import com.itcast.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class GloballyUniqueId implements Watcher {
    //  zk的连接串
    String IP = "192.168.60.130:2181";
    //  计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    //  用户生成序号的节点
    String defaultPath = "/uniqueId";
    //  连接对象
    ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 捕获事件状态
            if (event.getType() == Watcher.Event.EventType.None) {
                if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("连接成功");
                    countDownLatch.countDown();
                } else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
                    System.out.println("连接断开!");
                } else if (event.getState() == Watcher.Event.KeeperState.Expired) {
                    System.out.println("连接超时!");
                    // 超时后服务器端已经将连接释放,需要重新连接服务器端
                    zooKeeper = new ZooKeeper(IP, 6000,
                            new ZKConnectionWatcher());
                } else if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
                    System.out.println("验证失败!");
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    // 构造方法
    public GloballyUniqueId() {
        try {
            //打开连接
            zooKeeper = new ZooKeeper(IP, 5000, this);
            // 阻塞线程,等待连接的创建成功
            countDownLatch.await();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    // 生成id的方法
    public String getUniqueId() {
        String path = "";
        try {
            //创建临时有序节点
            path = zooKeeper.create(defaultPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        // /uniqueId0000000001
        return path.substring(9);
    }

    public static void main(String[] args) {
        GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
        for (int i = 1; i <= 5; i++) {
            String id = globallyUniqueId.getUniqueId();
            System.out.println(id);
        }
    }

}
分布式锁

每个客户端往/Locks下创建临时有序节点/Locks/Lock 000000001 客户端取得/Locks下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的 锁节点在第一位,代表获取锁成功 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点 Lock 000000001 当前一位锁节点(Lock 000000002)的逻辑 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class MyLock {
    //  zk的连接串
    String IP = "192.168.60.130:2181";
    //  计数器对象
    CountDownLatch countDownLatch = new CountDownLatch(1);
    //ZooKeeper配置信息
    ZooKeeper zooKeeper;
    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    private String lockPath;

    // 打开zookeeper连接
    public MyLock() {
        try {
            zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.None) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            System.out.println("连接成功!");
                            countDownLatch.countDown();
                        }
                    }
                }
            });
            countDownLatch.await();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    //获取锁
    public void acquireLock() throws Exception {
        //创建锁节点
        createLock();
        //尝试获取锁
        attemptLock();
    }

    //创建锁节点
    private void createLock() throws Exception {
        //判断Locks是否存在,不存在创建
        Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // 创建临时有序节点
        lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("节点创建成功:" + lockPath);
    }

    //监视器对象,监视上一个节点是否被删除
    Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                synchronized (this) {
                    notifyAll();
                }
            }
        }
    };

    //尝试获取锁
    private void attemptLock() throws Exception {
        // 获取Locks节点下的所有子节点
        List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
        // 对子节点进行排序
        Collections.sort(list);
        // /Locks/Lock_000000001
        int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
        if (index == 0) {
            System.out.println("获取锁成功!");
            return;
        } else {
            // 上一个节点的路径
            String path = list.get(index - 1);
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
            if (stat == null) {
                attemptLock();
            } else {
                synchronized (watcher) {
                    watcher.wait();
                }
                attemptLock();
            }
        }

    }

    //释放锁
    public void releaseLock() throws Exception {
            //删除临时有序节点
            zooKeeper.delete(this.lockPath,-1);
            zooKeeper.close();
            System.out.println("锁已经释放:"+this.lockPath);
    }

    public static void main(String[] args) {
        try {
            MyLock myLock = new MyLock();
            myLock.createLock();
        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }
}

四、核心知识

4.1 配置文件说明

properties
#通信心跳数,ZooKeeper服务器心跳时间,单位毫秒
#ZooKeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的
#时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒。
#用于心跳机制,并且设置最小的session超时时间为两倍心跳时间
#(session的最小超时时间是2*tickTime)。

tickTime=2000

#LF初始通信时限集群中的Follower跟随者服务器(F)与Leader领导者服务器
#(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。
#投票选举新Leader的初始化时间,Follower在启动过程中,会从Leader同步
#所有最新数据,然后确定自己能够对外服务的起始状态。
#Leader允许Follower在initLimit时间内完成这个工作。

initLimit=10

#LF同步通信时限集群中Leader与Follower之间的最大响应时间单位,
#假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,
#从服务器列表中删除Follwer。
#在运行过程中,Leader负责与ZooKeeper集群中所有机器进行通信,
#例如通过一些心跳检测机制,来检测机器的存活状态。
#如果L发出心跳包在syncLimit之后,还没有从F那收到响应,
#那么就认为这个F已经不在线了。

syncLimit=5

#数据文件目录+数据持久化路径
#保存内存数据库快照信息的位置,如果没有其他说明,
#更新的事务日志也保存到数据库。

dataDir=/datatmp/zookeeper/data
dataLogDir=/datatmp/zookeeper/logs

#客户端连接端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html
#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

#2888,3888 are election port
server.1=zookeeper:2888:38888
#2888端口号是服务之间通信的端口,而3888是;zookeeper与其他应用程序通
#信的端口.而zookeeper是在hosts中已映射了本机的IP

server.A = B:C:D : 
#A表示这个是第几号服务器,
#B 是这个服务器的 ip 地址;
#C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
#D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader

4.2 事件监听机制

zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对 象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变 等),会实时、主动通知所有订阅者 zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。 watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式

Watcher架构

Watcher实现由三个部分组成: Zookeeper服务端 、Zookeeper客户端 、客户端的ZKWatchManager对象

客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端, 接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程

Watcher特性
特性说明
一次性watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
客户端顺序回调watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数
据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
轻量级WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点
路径,并不会告诉数据节点变化前后的具体内容;
时效性watcher只有在当前session彻底失效时才会无效,若在session有效期内
快速重连成功,则watcher依然存在,仍可接收到通知;
Watcher通知状态**(KeeperState)

KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,

枚举属性说明
SyncConnected客户端与服务器正常连接时
Disconnected客户端与服务器断开连接时
Expired会话session失效时
AuthFailed身份认证失败时
Watcher事件类型**(EventType)

EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时 KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时, EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType, 是一个枚举类

枚举属性说明
None
NodeCreatedWatcher监听的数据节点被创建时
NodeDeletedWatcher监听的数据节点被删除时
NodeDataChangedWatcher监听的数据节点内容发生变更时(无论内容数据 是否变化)
NodeChildrenChangedWatcher监听的数据节点的子节点列表发生变更时

客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;

Watcher捕获相应事件

Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类:KeeperState、EventType

建立zookeeper的watcher监听。在zookeeper中采用zk.getChildren(path, watch)、zk.exists(path, watch)、zk.getData(path, watcher, stat)

注册方式CreatedChildrenChangedChangedDeleted
zk.exists(“/node-x”,watcher)可监控可监控可监控
zk.getData(“/node-x”,watcher)可监控可监控
zk.getChildren(“/node-x”,watcher)可监控可监控
java
// 客服端与服务器的连接状态
//KeeperState 通知状态 
//SyncConnected:客户端与服务器正常连接时 
//Disconnected:客户端与服务器断开连接时 
//Expired:会话session失效时 
//AuthFailed:身份认证失败时 
//事件类型为:None

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZKConnectionWatcher implements Watcher {

    // 计数器对象
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    // 连接对象
    static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent event) {
        try {
            // 事件类型
            if (event.getType() == Event.EventType.None) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接创建成功!");
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("断开连接!");
                } else if (event.getState() == Event.KeeperState.Expired) {
                    System.out.println("会话超时!");
                    zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("认证失败!");
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
            // 阻塞线程等待连接的创建
            countDownLatch.await();
            // 会话id
            System.out.println(zooKeeper.getSessionId());
            // 添加授权用户
            zooKeeper.addAuthInfo("digest1","itcast1:1234561".getBytes());
            byte [] bs=zooKeeper.getData("/node1",false,null);
            System.out.println(new String(bs));
            Thread.sleep(50000);
            zooKeeper.close();
            System.out.println("结束");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }


}
java
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZKWatcherExists {

    String IP = "192.168.60.130:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接zookeeper客户端
        zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("连接对象的参数!");
                // 连接成功
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void watcherExists1() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:使用连接对象中的watcher
        zooKeeper.exists("/watcher1", true);
        Thread.sleep(50000);
        System.out.println("结束");
    }


    @Test
    public void watcherExists2() throws KeeperException, InterruptedException {
        // arg1:节点的路径
        // arg2:自定义watcher对象
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("自定义watcher");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        Thread.sleep(50000);
        System.out.println("结束");
    }

    @Test
    public void watcherExists3() throws KeeperException, InterruptedException {
        // watcher一次性
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                    zooKeeper.exists("/watcher1", this);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        zooKeeper.exists("/watcher1", watcher);
        Thread.sleep(80000);
        System.out.println("结束");
    }


    @Test
    public void watcherExists4() throws KeeperException, InterruptedException {
        // 注册多个监听器对象
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("1");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("2");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }
        });
        Thread.sleep(80000);
        System.out.println("结束");
    }
}

五、安装

5.1 使用docker部署zookeeper

#拉取zk镜像
docker pull zookeeper:3.5
#创建容器
docker create --name zk -p 2181:2181 zookeeper:3.5
#启动容器
docker start zk

5.2 常规安装方式

安装前提是要有java环境

  1. tar –xf zookeeper-3.4.6.tar.gz 解压文件到"/usr/local/zookeeper-3.4.6"
  2. 复制conf目录下的zoo_sample.cfg,并命名为zoo.cfg
  3. 修改zoo.cfg

5.3 集群安装

前置安装与常规安装方式一样,还需在每个zookeeper的 data 目录下创建一个 myid 文件,这个文件就是记录每个服务器的ID,以123为例

sh
echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid

在每个zoo.cfg配置集群ip列表

sh
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

# server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
server.1=192.168.149.135:2881:3881
server.2=192.168.149.135:2882:3882
server.3=192.168.149.135:2883:3883
# 启动后,leader是领导者,follower是跟随者

集群注意事项

  • 3个节点的集群,2个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数
  • 当集群中的主服务器挂了,集群中的其他服务器会自动进行选举状态,然后产生新得leader
  • 当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者

5.4 taokeeper监控工具的使用

基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件, 安装前要求服务前先配置nc 和 sshd

  1. 下载数据库脚本

    sh
    wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql
  2. 下载主程序

    sh
    wget https://github.com/downloads/alibaba/taokeeper/taokeeper- monitor.tar.gz
  3. 下载配置文件

    sh
    wget https://github.com/downloads/alibaba/taokeeper/taokeeper-monitor- config.properties
  4. 配置taokeeper-monitor-config.properties

    properties
    #Daily 
    systemInfo.envName=DAILY 
    #DBCP 
    dbcp.driverClassName=com.mysql.jdbc.Driver 
    #mysql连接的ip地址端口号 
    dbcp.dbJDBCUrl=jdbc:mysql://192.168.60.130:3306/taokeeper 
    dbcp.characterEncoding=GBK 
    #用户名 
    dbcp.username=root 
    #密码 
    dbcp.password=root 
    dbcp.maxActive=30 
    dbcp.maxIdle=10 
    dbcp.maxWait=10000 
    #SystemConstant 
    #用户存储内部数据的文件夹 
    #创建/home/zookeeper/taokeeperdata/ZooKeeperClientThroughputStat 
    SystemConstent.dataStoreBasePath=/home/zookeeper/taokeeperdata 
    #ssh用户 
    SystemConstant.userNameOfSSH=zookeeper 
    #ssh密码 
    SystemConstant.passwordOfSSH=zookeeper 
    #Optional 
    SystemConstant.portOfSSH=22
5. 安装配置tomcat,修改catalina.sh
   ```sh
   #指向配置文件所在的位置 
   JAVA_OPTS=-DconfigFilePath="/home/zookeeper/taokeeper-monitor- tomcat/webapps/ROOT/conf/taokeeper-monitor-config.properties"
  1. 启动